---
layout: post
date: 2024-05-09
title: "Writing a task scheduler in Zig"
description: "How a mutex, a condition variable and a priority queue can be used to build a simple task scheduler in Zig."
tags: [zig]
---
<p>I'm working on an application that needs the ability to schedule tasks. Many applications have a similar need, but requirements can vary greatly. Advanced cases might require persistence and distribution, typically depending on external systems (like a database or queue) to do much of the heavy lifting. My needs are simpler: I don't have a huge variety of tasks or a high number of them.</p>

<aside><p>If you're just looking for a working implementation, I've added this to my <a href=https://github.com/karlseguin/zul>Zig Utility Library (zul)</a>, with <a href="https://www.goblgobl.com/zul/scheduler/">documentation</a>.</p></aside>

<h3>Thread-Per-Task</h3>
<p>At its simplest, we could use a thread with <code>sleep</code> to schedule some work at some future point:</p>

{% highlight zig %}
const std = @import("std");

pub fn main() !void {
  const thread = try std.Thread.spawn(.{}, say, .{"hello world!", 3 * std.time.ns_per_s});
  thread.join();
}

fn say(msg: []const u8, delay: u64) void {
  std.time.sleep(delay);
  std.debug.print("{s}\n", .{msg});
}
{% endhighlight %}

<p>Above is a standalone example in case you want to run it, but the same approach could easily be incorporated into a larger application. If you're new to Zig, the parameters to <code>Thread.spawn</code> are probably a little confusing. <code>spawn</code> takes 3 arguments: a configuration structure, the function to run, and the arguments to pass to the function. The first parameter, <code>.{}</code>, is something you'll see often in Zig and is the result of two language features: type inference and default field values. Imagine you have this struct:</p>

{% highlight zig %}
const Config = struct {
  stack_size: usize =  16_384,
  allocator: ?std.mem.Allocator = null,
};
{% endhighlight %}

<p>And this function:</p>

{% highlight zig %}
fn doSomething(config: Config) !void {
  ...
}
{% endhighlight %}

<p>These four calls to <code>run</code> are equivalent:</p>

{% highlight zig %}
// 1 - explicit config and explicit field values
run(Config{
  .stack_size = 16_384,
  .allocator = null,
});

// 2 - explicit Config, using the default field values
run(Config{});

// 3 - implicit (inferred) Config type and explicit field values
run(.{
  .stack_size = 16_384,
  .allocator = null,
});

// 4 - implicit (inferred) Config type, using the default field values
run(.{});
{% endhighlight %}

<p>In short, <code>run(Config{...})</code>  and <code>run(.{...})</code> are the same, with the compiler inferring the type of the paramter based on the definition of <code>run</code>.</p>

<p>It's also worth remembering that structure methods are normal functions called with the dot syntax. Above, we specified <code>run</code> as the function <code>spawn</code> should call along with the arguments. But you could (and in often cases would want to) call a structure's method:</p>

{% highlight zig %}
const std = @import("std");

pub fn main() !void {
  var person = Person{.name = "Vegeta"};
  const thread = try std.Thread.spawn(
    .{},
    Person.say,
    .{&person, "it's over 9000!", 3 * std.time.ns_per_s}
  );
  thread.join();
}

const Person = struct {
  name: []const u8,

  fn say(p: *const Person, msg: []const u8, when: u64) void {
    std.time.sleep(when);
    std.debug.print("{s} said: {s}\n", .{p.name, msg});
  }
};
{% endhighlight %}

<p>Be warned though that this example greatly increases the risk of introducing thread-unsafe code. Like our original code, this code has two running threads: the main thread and the thread launched by our call to <code>spawn</code>. However, in this version, the two threads have access to the same memory, the address of <code>person</code>. Since our launched thread reads <code>name</code>, if we changed our main thread to write to <code>name</code>, we'd have to introduce some form of protection (for example, by using a mutex).</p>

<p>Both examples calls <code>thread.join();</code>. When we start a thread, we should call either <code>detach</code> or <code>join</code>. These don't have to be called immediately. The thread which calls <code>join</code> blocks until the joined thread ends. Conversely, calling <code>detach</code> allows the calling thread to continue. If you replace <code>thread.join()</code> with <code>thread.detach()</code> the program will exit before the launched thread has time to finish executing. This is because the thread that calls <code>detach</code> is, in this case, our main thread, and after calling <code>detach</code> it continues to execute normally. For this program, that means reaching the end of <code>main</code> and exiting. Programs don't wait for all threads to finish, in this regards, the main / original thread is special.</p>

<p>As a final point, a <code>while (true)</code> loop can be added to run a recurring job:</p>

{% highlight zig %}
fn say(p: *const Person, msg: []const u8, when: u64) void {
  while (true) {
    std.time.sleep(when);
    std.debug.print("{s} said: {s}\n", .{p.name, msg});
  }
}
{% endhighlight %}
<h3>Naive Scheduler</h3>
<p>If we're doing more than firing the occasional one-off task, we might run into some limitations with the above approach. Maybe we want some insights/metrics into the pending tasks, such as how many we have and when the next one is scheduled to run. If we have a lot of tasks, or a dynamic number of tasks, it might not be ideal to dedicate a whole thread to each.</p>

<p>To start solving these requirements, we'll create a <code>Scheduler</code> that can take an arbitrary number of tasks each with their own schedule, and execute them within a single thread. If necessary, we could add a thread pool, but we'll start with a single thread.</p>

<p>We can start with an implementation that takes our above simple example and extends it with a queue of tasks to be run. Before we do that though, let's move beyond our hardcoded <code>run</code> task and use something more flexible. Specifically, we'll use a tagged union which will be application-specific: </p>

{% highlight zig %}
const Task = union(enum) {
  say: Say,
  db_cleaner: void,

  const Say = struct {
    person: *Person,
    msg: []const u8,
  };

  pub fn run(task: Task) void {
    switch (task) {
      .say => |s| std.debug.print("{s} said: {s}\n", .{s.person.name, s.msg}),
      .db_cleaner => {
        std.debug.print("Cleaning old records from the database\n", .{});
        // TODO,
      }
    }
  }
};
{% endhighlight %}

<p>In this case, we support two (dummy) tasks. The <code>run</code> method is important as this will be the function our scheduler will call. The result will be a generic scheduler that can operate on any type of task or sets of tasks.</p>

<p>Next, we create a <code>Job</code>, which is just a <code>Task</code> with a <code>run_at</code> field which is the unix timestamp (in milliseconds) that the task should be run at:</p>

{% highlight zig %}
fn Job(comptime T: type) type {
  return struct {
    task: T,
    run_at: i64,
  };
}
{% endhighlight %}

<p>The last component is our actual <code>Scheduler</code>. We'll build it up over time, but we can start with a skeleton:</p>

{% highlight zig %}
fn Scheduler(comptime T: type) type {
  return struct {
    queue: Queue, // Queue is defined a few lines down
    allocator: Allocator,

    // "Self" is a nicer name than "Scheduler(T)"
    const Self = @This();

    // "Queue" is a nicer name than "std.DoublyLinkedList(Job(T))"
    const Queue = std.DoublyLinkedList(Job(T));

    pub fn init(allocator: Allocator) Self {
      return .{
        .queue = Queue{},
        .allocator = allocator,
      };
    }

    pub fn deinit(self: *Self) void {
      // free any linked list nodes we have in our queue
      // for any jobs we haven't run yet.
      while (self.queue.pop()) |node| {
        self.allocator.destroy(node);
      }
    }
  };
}
{% endhighlight %}

<p>Our scheduler will use a doubly linked list for its queue. Later, we'll need to change this, but for now, the ability to add new tasks at the end of the queue and process tasks at the start of the queue is good enough. Our scheduler is missing two parts: the ability to schedule a task and the code to actually run those tasks. Keeping in mind that this is our naive implementation, the first part is easy to implement:</p>

{% highlight zig %}
fn Scheduler(comptime T: type) type {
  return struct {
    ...

    pub fn schedule(self: *Self, task: T, run_at: i64) !void {
      const node = try self.allocator.create(Queue.Node);
      node.data = Job(T){
        .task = task,
        .run_at = run_at,
      };
      self.queue.append(node);
    }

    pub fn scheduleIn(self: *Self, task: T, ms: i64) !void {
      return self.schedule(task, std.time.milliTimestamp() + ms);
    }
  };
}
{% endhighlight %}

<p>We've added two function. The first, <code>schedule</code> takes an absolute timestamp. The second, <code>scheduleIn</code> takes a relative time (from now) in milliseconds. We take an <code>i64</code> rather than a <code>u64</code> because this is what <code>std.time.milliTimestamp()</code> returns (in order to represent dates prior to Jan 1, 1970) so it makes our methods easier to use.</p>

<p>The code to run the tasks is more complicated. We'll start with a buggy implementation:</p>

{% highlight zig %}
fn Scheduler(comptime T: type) type {
  return struct {
    ...
    // This is the public method that's called to start the scheduler.
    // It launches a new thread.
    pub fn start(self: *Self) !void {
      const thread = try std.Thread.spawn(.{}, Self.run, .{self});
      thread.detach();
    }

    fn run(self: *Self) void {
      while (true) {
        const ms = self.timeUntilNextTask() orelse 1000;
        std.time.sleep(@intCast(ms * std.time.ns_per_ms));

        if (self.queue.popFirst()) |node| {
          // make sure we free the linked list node once we're
          // done running the task
          defer self.allocator.destroy(node);

          // run the task! The node data is the Job, which we set in
          // the schedule method above
          node.data.task.run();
        }
      }
    }

    fn timeUntilNextTask(self: *Self) ?i64 {
      if (self.queue.first) |first| {
        return std.time.milliTimestamp() - first.data.run_at;
      }
      return null;
    }
{% endhighlight %}

<p>If you were to put it all together, it would probably work, but it has a lot of issues, from incorrect functionality to being thread-unsafe. One obvious issue is our handling of an empty queue. Right now, we sleep for 1 second and see if there's any tasks to run. Consider what that means for the following usage:</p>

{% highlight zig %}
var s = Scheduler(Task).init(allocator);
defer s.deinit();

try s.start();
var person = Person{.name = "Vegeta"};
try s.scheduleIn(.{.say = .{.person = &person, .msg = "over 9000!"}}, 8000);
{% endhighlight %}

<p>Do you see the issue? We schedule the job to run in 8 seconds, but there's a good chance that <code>run</code> will sleep for 1 second (due to the queue being empty), wake up and process the newly added task immediately. To fix this, we need to double check the first job:</p>

{% highlight zig %}
while (true) {
  const ms = self.timeUntilNextTask() orelse 1000;
  std.time.sleep(@intCast(ms * std.time.ns_per_ms));

  const first = self.queue.first orelse {
    // queue is still empty, go back to the start of the loop
    // and try again
    continue;
  }
  const job = first.data;
  if (job.run_at > std.time.milliTimestamp()) {
    // we woke up too earlier, go back to the start of tte loop
    // and try again.
    continue;
  }
  self.queue.remove(first);
  self.allocator.destroy(first);
  job.data.task.run();
}
{% endhighlight %}

<p>But this is hardly the only timing issue we have. We also need to handle the case where <code>timeUntilNextTask</code> returns a negative value. More seriously, our scheduler assumes that the tasks are inserted in order that they should be run. If we schedule a task to be run in 1 hour, then schedule a task to be run in 10 seconds, our scheduler will sleep for 1 hour and our 2nd task will be run much too late. Finally, our handling of the empty queue isn't ideal; it would be nice to come up with an alternative to waking the thread every second.</p>

<p>The final major issue with this implementation is that it isn't thread-safe. The <code>run</code> method runs in its own thread, started by the <code>start</code> method. The <code>schedule</code> and <code>scheduleIn</code> methods are meant to be called from the main thread, or any other thread. However, both share and operate on the same <code>queue</code> field. This type of access has to be synchronized. If one thread happens to <code>append</code> to the <code>queue</code> via a call to <code>schedule</code>, while <code>run</code> calls <code>remove</code>, the behavior is undefined; the best possible outcome is a crash.</p>

<p>The threading issue can be fixed by adding a mutex, but the scheduling issue requires more fundamental changes. So while this was a good first step, it's time to look at a more correct implementation.</p>

<h3>Scheduler</h3>
<p>At a high level, the shortcomings of our naive implementation can be solved by:
<ol>
  <li>Using a sortable data structure to order pending jobs by their <code>run_at</code> field
  <li>Using a thread-safe signal to notify our scheduler when new jobs are added
  <li>Ensuring that all access to our shared data structure is synchornized by a mutex
</ol>

<p>We begin by replacing our <code>std.DoublyLinkedList</code> with an <code>std.PriorityQueue</code>. Like our linked list, the priority queue allows us to append new jobs and pop existing ones off. It also requires that we provide a function to compare two entries to control how entries are ordered. We begin with a new skeleton, which is similar to what we started with before:</p>

{% highlight zig %}
fn Scheduler(comptime T: type) type {
  return struct {
    queue: Queue,
    mutex: std.Thread.Mutex,

    const Self = @This();
    const Queue = std.PriorityQueue(Job(T), void, compare);

    fn compare(_: void, a: Job(T), b: Job(T)) std.math.Order {
      return std.math.order(a.run_at, b.run_at);
    }

    pub fn init(allocator: Allocator) Self {
      return .{
        .mutex = .{},
        .queue = Queue.init(allocator, {}),
      };
    }

    pub fn deinit(self: *Self) void {
      self.queue.deinit();
    }
  };
}
{% endhighlight %}

<p>Our <code>Queue</code> has changed type. Not only does the <code>PriorityQueue</code> take the type of value to store (as <code>DoublyLinkedList</code> does), it also takes a context and a function to compare two nodes. The context exits for cases where comparison is stateful. Whatever context we pass to <code>Queue.init</code> gets passed as the first parameter to the <code>compare</code> function. The context can be used to inform how two values should be compared. In many cases, including our own, there will be no context, and thus we specify a <code>void</code> context type and pass a void context value (<code>{}</code>) to <code>init</code>.</p>

<p>We've also added a <code>std.Thread.Mutex</code>, which we'll use in the following code to synchronize access to the <code>queue</code>. The mutex is initialized using the following syntax: <code>.mutex = .{}</code>. This is the same syntax and the same behavior as <code>Thread.spawn's</code> <code>config</code> parameter; again relying on Zig's type inference and default field values.</p>

<p>For now, our <code>schedule</code> method is similar to the previous one:</p>

{% highlight zig %}
pub fn schedule(self: *Self, task: T, run_at: i64) !void {
  const job = Job(T){
    .task = task,
    .run_at = run_at,
  };
  self.mutex.lock();
  defer self.mutex.unlock();
  try self.queue.add(job);
}

// unchanged
pub fn scheduleIn(self: *Self, task: T, ms: i64) !void {
  return self.schedule(task, std.time.milliTimestamp() + ms);
}
{% endhighlight %}

<p>When using the <code>DoublyLinkedList</code>, we were responsible for managing the <code>Node</code> memory. We had to create the nodes and free them, both in <code>deinit</code> and after running the task in <code>run</code>. The <code>PriorityQueue</code> has a different interface and manages its nodes internally. Discounting the introduction of our <code>mutex</code>, this makes both our <code>deinit</code> and <code>schedule</code> methods simpler. In fact, you might have noticed that our <code>Scheduler(T)</code> no longer holds a reference to the allocator since we no longer manage any memory direclty ourselves.</p>

<aside><p>You might wonder why <code>DoublyLinkedList</code> doesn't manage its own memory. I think it's because it doesn't have to. Unlike the <code>PriorityQueue</code> or <code>std.ArrayList</code> or one of the various <a href="/Zigs-HashMap-Part-1/">HashMaps</a>, a linked list (single or double) never has to move its nodes. It might mutate the nodes, specifically changing the <code>next</code> and <code>prev</code> pointers, but it doesn't relocate them in memory. As <code>PriorityQueues</code>, <code>ArrayLists</code> and <code>HashMaps</code> grow, the internal representation often has to be moved (to a larger contiguous block of memory).</p></aside>

<p>Although this code is useless without the <code>start</code> and <code>run</code> methods, it's worth re-iterating how important synchronizing access to <code>self.queue</code> is. Even with <code>schedule</code> alone, if our application is multi-threaded, two threads could call <code>schedule</code> at the same time and thus attempt to mutate <code>self.queue</code> at the same time, which would be an undefined behavior.</p>

<p>Switching to a <code>PriorityQueue</code> gets us half way there. We can now be sure that the first task in our <code>queue</code> is the one that should be run soonest. But we still face the issue of having tasks scheduled out of order. The core of our issue is the call to <code>std.time.sleep</code> in <code>run</code>. Even if new tasks get properly ordered within our queue, once our <code>run</code> thread goes to sleep, there's nothing we can do. Recall our previous code that looked something like:</p>

{% highlight zig %}
const ms = self.timeUntilNextTask();
std.time.sleep(@intCast(ms * std.time.ns_per_ms));
{% endhighlight %}

<p>Once we go to sleep, newly added tasks cannot be processed until this sleep ends. What we need is a way to wake up the thread when a new task gets scheduled. Better yet, we only need to wake the thread up if a newly scheduled task needs to run before the current sleep will end. The behavior we want is:

<ol>
  <li>We start the scheduler. The queue is empty. It goes to sleep "forever".
  <li>We schedule a task to run in 10 seconds. We wake the scheduler up, it detects that the next job should run in 10 seconds, so goes to sleep for 10 seconds.
  <li>We immediately schedule a new task to run in 5 seconds. We wake the scheduler up, it detects that the next job should run in 5 seconds, so goes to sleep for 5 seconds.
  <li>We immediately schedule yet another job, but this one to run in 7 seconds. We don't need to wake the scheduler up, because this job is meant to run after our next one (7 > 5).
</ol>

<p>Again, the <code>PriorityQueue</code> solves the issue of ordering. To solve the issue of waking up our scheduler, we turn to <code>std.Thread.Condition</code>. A condition variable provides a thread-safe way for threads to signal each other. Condition variables interact in specific ways with a mutex to provide certain guarantees, and this interaction is important to understand. We'll begin by adding our condition variable:</p>

{% highlight zig %}
fn Scheduler(comptime T: type) type {
  return struct {
    queue: Queue,
    mutex: std.Thread.Mutex,
    cond: std.Thread.Condition,

    pub fn init(allocator: Allocator) Self {
      return .{
        .cond = .{},
        .mutex = .{},
        .queue = Queue.init(allocator, {}),
      };
    }

    ...
  };
}
{% endhighlight %}

<p>We'll take small steps, starting with part of our <code>run</code> method:</p>

{% highlight zig %}
fn run(self: *Self) void {
  while (true) {
    self.mutex.lock();
    while (self.queue.peek() == null) {
      self.cond.wait(&self.mutex);
    }
    // TODO
  }
}
{% endhighlight %}

<p>There's a bit of magic and beauty in the above code. The first thing that should be obvious is that we need to lock <code>mutex</code> before we can check if <code>queue</code> is empty. Again, all access to <code>self.queue</code> has to be synchronized. But if you're unfamiliar with condition variables, you might have a couple questions. For example, why don't we unlock <code>mutex</code>? Why do we check if the queue is empty in a loop? How is <code>cond.wait</code> different than <code>sleep</code>?</p>

<p>The difference between <code>cond.wait</code> and <code>sleep</code> is that <code>cond.wait</code> blocks until it receives a signal. This is the mechanism we need in order to let newly scheduled tasks wake our scheduler. This isn't quite our final solution. We actually need a mix of the two behaviors: sleeping until it's time to run the next task OR until we receive a signal indicating newly added tasks. For this, we'll eventually use <code>cond.timedWait</code>, which does exactly what we need, but we'll get to that in a bit.</p>

<p>As for the first question, <code>cond.wait</code> and <code>cond.timedWait</code> expect a locked mutex, and internally release the mutex before going to sleep. Furthermore, the mutex is always locked before the call returns. By passing in a locked mutex, and receiving a locked mutex, the condition variable can be setup atomically, without the risk of another thread slipping in.</p>

<p>As for the while loop, it serves two purposes. First, it's possible for multiple threads to wait on the same condition and be woken up by the same signal (called a broadcast in that case). Second, condition variables can be <a href="https://en.wikipedia.org/wiki/Spurious_wakeup">victims of spurious wakeups</a>, that is, being woken up when it really shouldn't. Both cases are solved by re-checking the condition, which is what our inner <code>while</code> loop does. Keep in mind that when we do wake up, the <code>mutex</code> will be locked for us. So whenever you see a call to <code>cond.wait</code> or <code>cond.timedWait</code>, your mental model should be something like:</p>

{% highlight zig %}
fn wait(c: *std.Thread.Condition, mutex: *std.Thread.Mutex) void {
  // do some setup
  // ...

  mutex.unlock();

  // whatever happens, we'll always return with this locked
  defer mutex.lock();

  // wait for signal
  // or timeout if calling timedWait
  // ...
}
{% endhighlight %}

<p>Having <code>run</code> wait for a signal is only useful if we actually have something that sends a signal. Thus, we change <code>schedule</code>:</p>

{% highlight zig %}
pub fn schedule(self: *Self, task: T, run_at: i64) !void {
  const job = Job(T){
    .task = task,
    .run_at = run_at,
  };

  self.mutex.lock();
  defer self.mutex.unlock();
  try self.queue.add(job);
  self.cond.signal(); // <-- added this
}
{% endhighlight %}

<p>The only thing we've changed is to add the last line. This will wake up <code>run</code> whenever a task is added. But, we can do better than this. As we discussed earlier, we only want to send a signal under two conditions: the queue was empty  or the new task is meant to run before the previous earliest task:</p>

{% highlight zig %}
pub fn schedule(self: *Self, task: T, run_at: i64) !void {
  const job = Job(T){
    .task = task,
    .run_at = run_at,
  };

  self.mutex.lock();
  defer self.mutex.unlock();

  var reschedule = false;
  if (self.queue.peek()) |*next| {
    if (run_at < next.run_at) {
      reschedule = true;
    }
  } else {
    // the queue was empty
    reschedule = true;
  }

  try self.queue.add(job);

  if (reschedule) {
    // only send the signal under the above two conditions
    self.cond.signal();
  }
}
{% endhighlight %}

<p>This is the final implementation of <code>schedule</code>. But I do want to highlight the fact that <code>cond.signal</code> doesn't have to be called under lock. We could rewrite the above code like so:</p>

{% highlight zig %}
pub fn schedule(self: *Self, task: T, run_at: i64) !void {
  const job = Job(T){
    .task = task,
    .run_at = run_at,
  };

  var reschedule = false;
  {
    self.mutex.lock();
    defer self.mutex.unlock();

    if (self.queue.peek()) |*next| {
      if (run_at < next.run_at) {
        reschedule = true;
      }
    } else {
      reschedule = true;
    }
    try self.queue.add(job);

    // mutex is unlocked here by the above defer
  }

  if (reschedule) {
    // Our new job is scheduled before our previous earlier job
    // (or we had no previous jobs)
    // We need to reset our schedule
    self.cond.signal();
  }
}
{% endhighlight %}

<p>The <code>self.cond.signal()</code> call is no longer called under lock. This is a tiny bit more efficient since we hold the lock for a shorter amount of time. But it does highlight something important: in Zig, <code>defer</code> is run at the end of the current block. The above would be riskier to do in Go where <code>defer</code> is run at the end of the function. Instead, in Go, we wouldn't be able to use <code>defer</code> and would have to place calls to <code>unlock</code> at the correct places, keeping in mind that <code>self.queue.add</code> could fail, and new control flow could be added in the future.</p>

<p>With <code>schedule</code> finished, all that's left is to fully implement <code>run</code>:</p>

{% highlight zig %}
fn run(self: *Self) void {
  while (true) {
    self.mutex.lock();
    if (self.timeUntilNextTask()) |ms| {
      if (ms > 0) {
        const ns: u64 = @intCast(std.time.ns_per_ms * ms);
        self.cond.timedWait(&self.mutex, ns) catch {
          // only error possible from timedWait
          std.debug.assert(err == error.Timeout);
        };
      }
    } else {
      self.cond.wait(&self.mutex);
    }
    // at this point, mutex is locked

    // possible that we were victims of a spurious wakeup
    // so we must always be defensive

    // do we even has a job...?
    const next = self.queue.peek() orelse continue;

    // ... and if so, should it be run now?
    if (next.run_at > std.time.milliTimestamp()) {
      continue;
    }

    // ok, this should be run
    const job = self.queue.remove();

    // don't want to hold the lock while processing the job
    self.mutex.unlock();

    // run it
    job.task.run();
  }
}

fn timeUntilNextTask(self: *Self) ?i64 {
  // assumed to be called while mutex.lock is held
  if (self.queue.peek()) |*next| {
    return next.run_at - std.time.milliTimestamp();
  }
  return null;
}
{% endhighlight %}

<p>This is a big leap, but it mostly combined everything we've seen so far. When the queue is empty we use <code>cond.wait</code>, meaning we'll sleep until a task is scheduled and <code>cond.signal</code> is called. When the queue isn't empty, we use <code>cond.timedWait</code>. In this case there are two ways for us to wake up. The first is, again, via a signal sent by the <code>schedule</code> method. The second is if our <code>timedWait</code> times out, indicating that we have a task to run.</p>

<p>Once we unblock, the logic is the same in either case. First, we're defensive and make sure that (a) we do indeed have a task and (b) it should be run. We can them remove the task from the queue and unlock the queue while the task is run. Unlocking here is important as it allows other threads to schedule new tasks.</p>

<h3>Conclusion</h3>
<p>There are a lot of features we might want to add, for example metrics or the ability to stop and restart the scheduler. Much like the <code>PriorityQueue</code> takes a context that is passed back to our <code>compare</code> function, it makes sense for our <code>Scheduler(T)</code> to do the same: taking a context on initialization which is passed to each task.</p>

<p>My <a href="https://github.com/karlseguin/zul/blob/master/src/scheduler.zig">Zig Utility Library</a> has the complete implementation that can largely be copy and pasted. And while part of the goal was to write a task scheduler, I hope some readers found insight with respect to multi-threaded programming.</p>
